feat: parquet sharding#4
Conversation
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (2)
✅ Files skipped from review due to trivial changes (1)
🚧 Files skipped from review as they are similar to previous changes (1)
📝 WalkthroughWalkthroughAdds a new function to split a Parquet file into multiple shard Parquet files by iterating input row-group batches, lazily creating per-shard writers, writing batches until a per-shard row limit is reached, and ensuring writers are closed. Changes
Sequence Diagram(s)sequenceDiagram
participant Caller
participant ParquetFile
participant ShardWriter as ParquetWriter
participant FS as Filesystem
Caller->>ParquetFile: open(input_file)
ParquetFile-->>Caller: metadata (row_count)
loop for each row_group batch
ParquetFile->>Caller: iter_batches -> batch
alt shard writer not created
Caller->>FS: ensure output_dir exists
Caller->>ShardWriter: create writer (shard_x.parquet, schema)
end
Caller->>ShardWriter: write(batch)
Caller->>Caller: accumulate rows_written
alt rows_written >= rows_per_shard
Caller->>ShardWriter: close()
Caller->>Caller: shard_idx++, reset rows_written
end
end
opt open writer remaining
Caller->>ShardWriter: close()
end
Caller-->>FS: log completion
Estimated code review effort🎯 2 (Simple) | ⏱️ ~10 minutes Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request introduces a shard_parquet utility designed to split large Parquet files into smaller, manageable shards using memory-efficient streaming. The review feedback suggests several improvements to align with standard practices and enhance robustness: transitioning from os and print to pathlib and logging, refining the function signature to support Path objects, and implementing better resource management using try...finally blocks to ensure file writers are properly closed in case of errors. Additionally, a redundant parameter in the write_batch call was identified for removal.
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@rationai/mlkit/data/shard_parquet.py`:
- Line 7: The code currently uses os.makedirs(output_dir, exist_ok=True) and
deterministic shard filenames like shard_00000.parquet which lets reruns
silently overwrite existing shards; change this by adding a pre-check for
existing shard files in output_dir (e.g., glob for "shard_*.parquet") and
fail/raise if any are found or create a uniquely-named output directory
(timestamp/suffix) instead, and remove or set exist_ok=False on
os.makedirs(output_dir, ...) so the guard triggers; apply the same protection
around the shard-writing loop that emits shard_*.parquet (lines that write
shards) to prevent accidental overwrites.
- Line 6: Validate sharding parameters at the start of shard_parquet: check that
rows_per_shard and row_group_size are positive integers (greater than 0) and
raise a clear ValueError if not; include the parameter name and offending value
in the error message so callers immediately see which argument is invalid and
why before any I/O or processing occurs.
- Around line 18-41: The loop over parquet_file.iter_batches currently can skip
writer.close() if iter_batches or writer.write_batch raises; wrap the entire
batching logic in a try/finally (or use a context manager for pq.ParquetWriter)
so that writer.close() is always called when writer is not None; ensure you
reference the same writer variable used when creating
out_path/shard_{shard_idx:05d}.parquet and that current_shard_rows/shard_idx
semantics remain unchanged so partial shards are closed in the finally block on
any exception.
- Around line 27-36: The code currently writes whole batches with
writer.write_batch(batch) then checks current_shard_rows, allowing a shard to
exceed rows_per_shard by a full batch; change write logic in the loop that
handles `batch` so that before calling writer.write_batch you check whether
batch.num_rows + current_shard_rows > rows_per_shard and, if so, split the batch
into smaller pieces and write only up to the remaining rows for the current
shard, close the writer (writer.close(); writer = None), increment shard_idx and
reset current_shard_rows, then create/open a new writer and continue writing the
rest of the batch until the entire batch is consumed; use the existing symbols
`batch`, `batch.num_rows`, `current_shard_rows`, `rows_per_shard`,
`writer.write_batch`, `writer.close`, and `shard_idx` to implement the looped
slicing/writing until all rows are written and no shard exceeds rows_per_shard.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 731d9160-ed15-40c7-b918-46097ee49135
📒 Files selected for processing (1)
rationai/mlkit/data/shard_parquet.py
There was a problem hiding this comment.
Pull request overview
Adds a utility for splitting (“sharding”) a large Parquet file into multiple smaller Parquet files by streaming record batches from the source and writing them into sequential shard files.
Changes:
- Introduces
shard_parquet(...)to stream-read a Parquet file and write out multipleshard_XXXXX.parquetoutputs. - Adds basic progress reporting and shard rollover logic based on
rows_per_shard.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 1 out of 1 changed files in this pull request and generated 5 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
\gemini review |
|
@CodeRabbit review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 1 out of 1 changed files in this pull request and generated 4 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Summary by CodeRabbit